package cn.ccccltd.waf.message.base.provider;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import cn.ccccltd.waf.message.constant.MessageConstants;
import cn.ccccltd.waf.message.model.MessageCommon;
import cn.ccccltd.waf.message.model.result.Data;
import cn.ccccltd.waf.message.model.result.MessageResult;
import cn.ccccltd.waf.message.model.vo.GeneralMessage;
import cn.ccccltd.waf.message.thread.QueueSendThread;
import cn.ccccltd.waf.message.thread.threadlocal.MessageContext;
import cn.ccccltd.waf.message.util.JsonUtils;
import cn.ccccltd.waf.message.util.QueueUtils;
import cn.ccccltd.waf.message.util.sql.SqlContext;
import cn.ccccltd.waf.message.util.sql.SqlTools;

/**
 * 创建日期:2017年11月1日
 * Title:生产者必须的接口
 * Description：对本文件的详细描述，原则上不能少于50字
 * @author yangjingjiang
 * @mender：（文件的修改者，文件创建者之外的人）
 * @version 1.0
 * Remark：认为有必要的其他信息
 */
@Component
public  abstract class   BaseMessageService <T extends MessageCommon> {

	private static Logger log = LoggerFactory.getLogger(BaseMessageService.class);
	
	@Autowired
	private JdbcTemplate jdbcTemplate;
	
	/**
	 * 功能: 发送消息的统一接口<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月1日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param content
	 * @return
	 */
	public Object sendMessage(JSONObject content) {
		
		return null;
	}
	
	/**
	 * 功能: 自动适配线程池发送还是单线程发送消息<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月17日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param msgListQueue
	 * @param groupSize
	 * @return
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public MessageResult sendMessageByThread(List<T> msgListQueue,int groupSize) {

		long currentTimeMillis = System.currentTimeMillis();
		
		MessageResult result = new MessageResult();

		//总共发送条数
		int queueSize = 0;
		//线程数
		int threadSize = 0;
		
		if (msgListQueue.size()<groupSize+1) {
			
			threadSize = 1;
			for (T t : msgListQueue) {
				QueueUtils.sendMessageToQueue(getJmsTemplate(), JsonUtils.toObject(JsonUtils.toJson(t), JSONObject.class));
				queueSize++;
			}
			
		} else {
			
			//创建虚拟线程池，不使用后则会被回收
			ExecutorService executorService = new ThreadPoolExecutor(0, 1000,
	                60L, TimeUnit.SECONDS,
	                new SynchronousQueue<Runnable>());  
			List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();  
			
			//Apache的自动分组，线程分组
			List<List<T>> partition = ListUtils.partition(msgListQueue, groupSize);
			
			//设置日志线程记录的大小
			threadSize = partition.size();
			
			for (List<T> list : partition) {
				
				Future<Integer> future = executorService.submit(new QueueSendThread(list, getJmsTemplate()));
				resultList.add(future);
				
			}
			
		    try {
		        //线程池创建完毕，执行任务
		    	executorService.shutdown();  
		   
		        // (所有的任务都结束的时候，返回TRUE)  
		        if (!executorService.awaitTermination(60*1000, TimeUnit.MILLISECONDS)) {  
		            // 超时的时候向线程池中所有的线程发出中断(interrupted)。  
		        	executorService.shutdownNow();  
		        }
		        
		    } catch (InterruptedException e) {  
		        // awaitTermination方法被中断的时候也中止线程池中全部的线程的执行。  
		        executorService.shutdownNow();  
		    }
		    
			for (Future<Integer> future : resultList) {
				try {
					queueSize = queueSize + future.get();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			
			
		}
		
		long currentTimeMillisEnd = System.currentTimeMillis();
		
		result.setSuccess("true");
		result.setStateCode("200");
		result.setMsg("队列发送成功！"+queueSize+"条，"+(currentTimeMillisEnd-currentTimeMillis)+"毫秒，"+threadSize+"个线程");
		
		log.info(MessageConstants.LOG_DEBUG_PREFIX + "队列发送成功！"+queueSize+"条，"+(currentTimeMillisEnd-currentTimeMillis)+"毫秒，"+threadSize+"个线程!");
		
		return result;
	
	}
	
	/**
	 * 功能: 查询消息的接口<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月11日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param params
	 * @return
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public Object queryMessage(Map<String,String> params) {
		
		Class entityClass = getEntityClass();
		
		String tableName = getTableName();
		
		String selectSql = SqlTools.getSelectAllProperty(tableName, entityClass);
		
		List queryForList = jdbcTemplate.query(selectSql, new BeanPropertyRowMapper(entityClass));
		
		Data dt = new Data<>();
		String sqlCount = SqlTools.getCountSql(tableName);
		dt.setTotal((Long)queryCountMessage(sqlCount));
		dt.setRows(queryForList);
		
		MessageResult rs = new MessageResult("true", "200", "加载数据成功！", dt);
		
		return rs;
	}
	
	/**
	 * 功能: 统计条数<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月11日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param sql
	 * @return
	 */
	public Long queryCountMessage(String sql) {
		
		Long queryCount = jdbcTemplate.queryForObject(sql, Long.class);
		
		return queryCount;
	}
	
	/**
	 * 功能: 统计消息数据<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月11日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param params
	 * @return
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public Object queryStatMessage(Map<String,String> params) {
		
		String tableName = getTableName();
		
		String selectSql = "SELECT id, send_systemid, create_time,business_module,send_userid,send_time,category, count("+MessageContext.getValue(MessageConstants.CATEGORY)+"_id) as total, common_id, sum(case when send_state = '00' then 1 else 0 end) as wait_num,sum(case when send_state = '100' then 1 else 0 end) as success_num,sum(case when send_state = '201' then 1 else 0 end) as retry_num,sum(case when send_state = '200' then 1 else 0 end) as fail_num, sum(case when send_state = '300' then 1 else 0 end) as canl_num   FROM "+tableName+" WHERE 1=1 GROUP BY common_id";
		
		List<GeneralMessage> queryList = jdbcTemplate.query(selectSql, new BeanPropertyRowMapper(GeneralMessage.class) );
		
		Data<GeneralMessage> dt = new Data<>();
		String sqlCount = "select count(*) from (SELECT COUNT(*) FROM "+tableName+"  WHERE 1=1 GROUP BY common_id) sum_table";
		dt.setTotal((Long)queryCountMessage(sqlCount));
		dt.setRows(queryList);
		
		MessageResult rs = new MessageResult("true", "200", "加载数据成功！", dt);
		
		return rs;
	}
	
	/**
	 * 功能: 根据系统名id统计查询<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月20日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param params
	 * @return
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public Object queryStatMessageBySystemid(Map<String,String> params) {
		
		String tableName = getTableName();
		String selectSql = "SELECT send_systemid, category, count("+MessageContext.getValue(MessageConstants.CATEGORY)+"_id) as total, sum(case when send_state = '00' then 1 else 0 end) as wait_num,sum(case when send_state = '100' then 1 else 0 end) as success_num,sum(case when send_state = '201' then 1 else 0 end) as retry_num,sum(case when send_state = '200' then 1 else 0 end) as fail_num, sum(case when send_state = '300' then 1 else 0 end) as canl_num   FROM "+tableName+" WHERE 1=1 GROUP BY send_systemid";

		List<GeneralMessage> queryList = jdbcTemplate.query(selectSql, new BeanPropertyRowMapper(GeneralMessage.class) );
		
		Data<GeneralMessage> dt = new Data<>();
		String sqlCount = "select count(*) from (SELECT COUNT(*) FROM "+tableName+"  WHERE 1=1 GROUP BY send_systemid) sum_table";
		dt.setTotal((Long)queryCountMessage(sqlCount));
		dt.setRows(queryList);
		
		MessageResult rs = new MessageResult("true", "200", "加载数据成功！", dt);
		
		return rs;
	}
	
	/**
	 * 功能: 根据业务模块统计查询<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月20日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param params
	 * @return
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public Object queryStatMessageByModule(Map<String,String> params) {
		
		String tableName = getTableName();
		String selectSql = "SELECT send_systemid , business_module,  category, count("+MessageContext.getValue(MessageConstants.CATEGORY)+"_id) as total, sum(case when send_state = '00' then 1 else 0 end) as wait_num,sum(case when send_state = '100' then 1 else 0 end) as success_num,sum(case when send_state = '201' then 1 else 0 end) as retry_num,sum(case when send_state = '200' then 1 else 0 end) as fail_num, sum(case when send_state = '300' then 1 else 0 end) as canl_num  FROM "+tableName+" WHERE 1=1 GROUP BY send_systemid,business_module";

		List<GeneralMessage> queryList = jdbcTemplate.query(selectSql, new BeanPropertyRowMapper(GeneralMessage.class) );
		
		Data<GeneralMessage> dt = new Data<>();
		String sqlCount = "select count(*) from (SELECT COUNT(*) FROM "+tableName+"  WHERE 1=1 GROUP BY send_systemid,business_module) sum_table";
		dt.setTotal((Long)queryCountMessage(sqlCount));
		dt.setRows(queryList);
		
		MessageResult rs = new MessageResult("true", "200", "加载数据成功！", dt);
		
		return rs;
	}
	
	/**
	 * 功能: 当前处理类<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月10日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @return
	 */
	protected abstract String getTableName();
	
	
	/**
	 * 功能: 获取不同的JmsTemplate<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月17日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @return
	 */
	protected abstract JmsTemplate getJmsTemplate();
	/**
	 * 功能: 插入common表<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月14日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @return
	 */
	@SuppressWarnings("rawtypes")
	protected abstract Class getEntityClass();
	
	/**
	 * 功能: 往common_id中添加消息数据<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月21日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param message
	 * @return
	 */
	protected  boolean insertMessageSuperTable(T message) {
		
		SqlContext superSql = SqlTools.getSuperInsert(message);
		int superCount = jdbcTemplate.update(superSql.getSql(), superSql.getParams());
		
        return superCount>0;
	}
	
	/**
	 * 功能: 插入消息表 一定要设置	<br>
	 * 放置父级关联id
	 * message.setCommonId(message.getId())<br>
	 * 作者: yangjingjiang <br>
	 * 创建日期:2017年11月14日 <br>
	 * 修改者: mender <br>
	 * 修改日期: modifydate <br>
	 * @param message
	 * @return
	 */
	protected boolean insertMessageTable(T message) {
		
		// 1.提供一个可以添加的sql语句
        SqlContext sql = SqlTools.getInsert(message);
        // 2.调用jdbcTemplate的update方法进行添加操作
        int count = jdbcTemplate.update(sql.getSql(), sql.getParams());
        
        return count>0;
	}
}
